Flink 启动流程 您所在的位置:网站首页 Flink 启动流程之 yarn Flink 启动流程

Flink 启动流程

2024-07-14 06:46| 来源: 网络整理| 查看: 265

Flink 1.10

前面几篇文章从各个组件的角度阐述了 Flink 任务的启动,本节将以 Flink On Yarn 的 Per Job 模式将所有知识点串起来。

以下从本地和集群两个角度来说明

本地 flink run

当执行上面的命令后,开始任务的提交:

Flink 启动流程之 flink run 中最后是去执行任务的代码 深入剖析 Flink Straming WC流程 中生成 JobGraph,并提交到集群;不同之处在于本例是 Yarn 集群

下面来说说在 Yarn 集群上,JobGraph 是如何提交的。此时的 Executor 是 YarnJobClusterExecutor,由于生成 StreamGraph 代码是相同的,直接从 YarnJobClusterExecutor 开始 (直接执行 flink run 是跑在yarn 集群的 per job任务)。

// org.apache.flink.yarn.executors.YarnJobClusterExecutor public class YarnJobClusterExecutor extends AbstractJobClusterExecutor { public static final String NAME = "yarn-per-job"; public YarnJobClusterExecutor() { super(new YarnClusterClientFactory()); } } // org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor public class AbstractJobClusterExecutor implements PipelineExecutor { private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class); private final ClientFactory clusterClientFactory; public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { this.clusterClientFactory = checkNotNull(clusterClientFactory); } @Override public CompletableFuture execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception { // 生成 jobgraph 过程后续介绍 final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration); // 获取 YarnClusterDescriptor,deployJobCluster 就提交 jobGraph 到yarn // 这里可以去看看 yarn-seesion 的启动流程,clusterClientFactory = YarnClusterClientFactory try (final ClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); // 计算内存,slot final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); final ClusterClientProvider clusterClientProvider = clusterDescriptor .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); return CompletableFuture.completedFuture( new ClusterClientJobClientAdapter(clusterClientProvider, jobGraph.getJobID())); } } } // org.apache.flink.yarn.YarnClusterDescriptor#deployJobCluster public ClusterClientProvider deployJobCluster( ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) throws ClusterDeploymentException { try { return deployInternal( clusterSpecification, "Flink per-job cluster", getYarnJobClusterEntrypoint(), jobGraph, detached); } catch (Exception e) { throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e); } } -> startAppMaster // org.apache.flink.yarn.YarnClusterDescriptor#startAppMaster // 以下是 per-job 流程 一:初始化 HDFS,获取 HDFS 目录(当前用户在 hsfs 上的目录) 二:收集要上传到 HDFS 的文件 (systemShipFiles): flink jar、配置文件等 三:用户缓存文件上传到 HDFS,并将目录保存到 jobGraph 四:上传之前收集文件目录上传到 HDFS 五:上传 flink-conf 配置文件 六:per-job 模式下:jobGraph 写入文件上传到 HDFS 七:准备 AppMasterContainer 八:构造 appMasterEnv 九:向 yarn 提交任务,死等 yarn 返回任务状态: running:正常提交 finish:任务执行完毕 FAILED/KILLED:提交异常

至此本地的提交流程结束,结合源码可以封装个自己的 Yarn 提交代码。

集群

clusterDescriptor.deployJobCluster 执行时会执行 ClusterEntrypoint。Flink 启动流程之 JobManager 中介绍的是 Standalone 模式下,作为对比看看 YarnJobClusterEntrypoint 有什么不同。

// org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint // ------------------------------------------------------------------------ // The executable entry point for the Yarn Application Master Process // for a single Flink job. // ------------------------------------------------------------------------ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint { protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException { return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory( YarnResourceManagerFactory.getInstance(), // 获取 JobGraph HDFS 地址 FileJobGraphRetriever.createFrom(configuration, getUsrLibDir(configuration))); } ... // Standalone 模式下相同步骤 }

集群上主要是启动 WebMonitorEndpoint、ResourceManager、DispatcherRunner 组件;DispatcherRunner 根据获取到的 JobGraph 启动 JobMaster;JobMaster 将 JobMaster 生成 ExecutionGraph ,向 ResourceManager 申请 slot 资源开始部署 task。

最新版本1.11支持 Application 模式,可以将 JobGraph 生成也放到集群,减少本地压力。

总结

以下内容摘抄参考资料一

本地流程 与 Session-Cluster 模式类似,入口也为CliFrontend#main 解析处理参数 根据用户 jar、main、程序参数、savepoint 信息生成 PackagedProgram 根据 PackagedProgram 创建 JobGraph(对于非分离模式还是和Session模式一样,模式Session-Cluster) 获取集群资源信息(createClusterDescriptor,getClusterSpecification) 部署集群 YarnClusterDesriptor#deployJobCluster -> AbstractYarnClusterDescriptor#deployInternal;后面流程与Session-Cluster类似,值得注意的是在 AbstractYarnClusterDescriptor#startAppMaster 中与 Session-Cluster 有一个显著不同的就是其会将任务的JobGraph上传至Hdfs供后续服务端使用 初始化文件系统(HDFS) 将 log4j、logback、flink-conf.yaml、jar 包上传至 HDFS 构造 AppMaster 的 Container(确定 Container 进程的入口类 YarnJobClusterEntrypoint ),构造相应的 Env YarnClient 向 Yarn 提交 Container 申请 跟踪 ApplicationReport 状态(确定是否启动成功,可能会由于资源不够,一直等待) 启动成功后将对应的 ip 和 port 写入 flinkConfiguration 中 创建与将集群交互的 ClusterClient 远端流程 远端宿主在 Container 中的集群入口为YarnJobClusterEntrypoint#main ClusterEntrypoint#runClusterEntrypoint -> ClusterEntrypoint#startCluster启动集群 创建 JobDispatcherResourceManagerComponentFactory(用于创建JobDispatcherResourceManagerComponent) 创建 ResourceManager(YarnResourceManager)、Dispatcher(MiniDispatcher),其中在创建 MiniDispatcher 时会从之前的JobGraph文件中读取出JobGraph,并启动进行ZK选举 当为主时会调用Dispatcher#grantLeadership方法 Dispatcher#recoverJobs 恢复任务,获取 JobGraph Dispatcher#tryAcceptLeadershipAndRunJobs 确认获取主并开始运行任务 Dispatcher#runJob 开始运行任务 ==> 创建 JobManagerRunner 并启动 创建并 JobMaster 生成 ExecutionGraph,向 ResourceManager 申请 slot 部署 参考资料

【Flink】深入理解Flink-On-Yarn模式

一张图轻松掌握 Flink on YARN 基础架构与启动流程

Previous Flink 启动流程之 flink run Next Flink 源码之构建 StreamGraph


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有